Skip to content

Add workflow_streams samples#300

Open
jssmith wants to merge 8 commits intotemporalio:mainfrom
jssmith:workflow-streams-samples
Open

Add workflow_streams samples#300
jssmith wants to merge 8 commits intotemporalio:mainfrom
jssmith:workflow-streams-samples

Conversation

@jssmith
Copy link
Copy Markdown
Contributor

@jssmith jssmith commented Apr 30, 2026

Summary

Adds samples for temporalio.contrib.workflow_streams, the workflow-hosted durable event stream contrib (experimental, contrib/pubsub branch of sdk-python).

Four scenarios under workflow_streams/, each exercising more of the API:

  • order_workflow — minimal publisher driven by an activity, with two typed topics on a single stream
  • pipeline_workflow — reconnecting subscriber across activity steps
  • hub_workflow — external publisher driving a long-lived workflow
  • ticker_workflow — truncating publisher with a bounded ring buffer

Workflows bind typed topic handles in @workflow.init and publish via topic.publish(value). Subscribers iterate from a WorkflowStreamClient. A small race_with_workflow helper races the consumer against handle.result() so a workflow failure surfaces as an exception rather than blocking the subscriber forever.

This is one half of #299, split out so the workflow_streams basics can land independently of the openai_agents streaming sample (separate PR).

Test plan

  • Build sdk-python from contrib/pubsub and install into the samples uv environment
  • uv run workflow_streams/run_worker.py and run each run_*.py starter; verify expected output
  • Reconnecting subscriber: kill and restart mid-run, confirm resume from prior offset
  • Truncating ticker: confirm ring-buffer drops oldest items as expected

jssmith and others added 5 commits April 29, 2026 20:27
Initial samples directory for temporalio.contrib.workflow_streams,
the workflow-hosted durable event stream contrib (experimental,
contrib/pubsub branch of sdk-python).

The order_workflow scenario covers the basic publisher path: a
workflow binds a typed topic in @workflow.init, an activity
publishes events via the topic handle, and a starter subscribes
with WorkflowStreamClient and prints events as they arrive.

Also enables the uv supply-chain cooldown options in the lockfile.
Adds a second scenario demonstrating the central Workflow Streams use
case: a consumer disconnects mid-stream and resumes later via
subscribe(from_offset=...), with no events lost or duplicated. The
existing OrderWorkflow finishes too quickly to make the pattern
visible, so this introduces a multi-stage PipelineWorkflow paced with
workflow.sleep between stages.

The runner reads a couple of events, persists item.offset + 1 to a
temp file, sleeps "disconnected" while the workflow keeps publishing,
then opens a fresh Client + WorkflowStreamClient and resumes from the
persisted offset — the same shape that works across actual process
restarts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a third scenario covering the third publisher shape: a backend
service or scheduled job pushing events into a workflow it didn't
itself start. The earlier scenarios publish either from inside the
workflow or from one of its activities; this one uses
WorkflowStreamClient.create() externally.

HubWorkflow is a passive stream host — it does no work of its own and
just waits to be told to close, fitting the event-bus pattern. The
runner publishes a series of news headlines, runs a subscriber task
alongside, signals close, and exits when both tasks complete.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a fourth scenario for long-running workflows that need to bound
their event log: the workflow publishes events at a fixed cadence and
calls self.stream.truncate(...) periodically to keep only the most
recent entries.

The runner subscribes twice — fast and slow — to make the trade
visible: the fast subscriber sees every offset in order; the slow one
falls behind a truncation, has its iterator transparently jump forward
to the new base offset, and shows the offset gap that intermediate
events fell into. This is the model for high-volume long-running
streams: bounded log size, slow consumers may miss intermediate events
but always see the most recent state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…handles

- Directory and module path renamed to plural to match sdk-python
  `temporalio.contrib.workflow_streams` rename.
- Workflow-side: bind a typed topic handle in `@workflow.init` and call
  `topic.publish(value)` — the removed `WorkflowStream.publish` form is
  gone. Same change applied to the activity and external-publisher.
- Activity: `WorkflowStreamClient.from_activity()` →
  `from_within_activity()`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jssmith jssmith requested a review from a team as a code owner April 30, 2026 03:34
@jssmith jssmith mentioned this pull request Apr 30, 2026
jssmith added 3 commits April 29, 2026 20:49
- README: fix scenario count (two -> four), document subscriber start
  position and continue-as-new semantics for stream_state
- hub_workflow: drop stale comment referencing a README race note
  that does not exist in this sample
- payment_activity: trim long publisher_id/dedup caveat — moved out
  of the first sample's docstring to keep it approachable
…be shape

End-to-end runs of the four workflow_streams scenarios surfaced two
sample-side issues, both fixed here.

run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and
called ``payload_converter.from_payload(item.data, T)``. The contrib's
``subscribe()`` defaults to converter-decoded data, not raw payloads,
so this assertion fired on the first run. Switch to
``result_type=RawValue`` (the documented escape hatch for heterogeneous
topics) and read ``item.data.payload``.

Items published in the same workflow task that returns from
``@workflow.run`` were not delivered to subscribers — the in-memory
log dies with the workflow and the next subscriber poll lands on a
completed workflow. Fix: each scenario now uses an in-band terminator
that subscribers break on, and each workflow holds the run open with
``await workflow.sleep(timedelta(milliseconds=500))`` so that final
publish is fetched before the workflow exits:

- OrderWorkflow / PipelineWorkflow: the workflow's own
  ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")``
  is the terminator (consumers already broke on it).
- HubWorkflow: the *publisher* in run_external_publisher emits a
  sentinel ``NewsEvent(headline="__done__")`` immediately before
  signaling close; the consumer breaks on the sentinel.
- TickerWorkflow: the final tick (n == count - 1) is the terminator;
  ``keep_last`` guarantees that offset survives the last truncation,
  so even slow consumers reach it.

Because subscribers stop polling on the terminator, by the time
``workflow.run`` returns there are no in-flight poll handlers — no
``UnfinishedUpdateHandlersWarning`` from the SDK and no need for
``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in
the workflow exit path.

Two consecutive end-to-end runs of all four scenarios pass cleanly
against ``temporal server start-dev --headless``.
Subscribers don't exit on their own when the host workflow completes —
they need an in-band terminator, and the workflow needs to hold open
briefly so the final publish is fetched before run() returns. Both
pieces show up in every scenario here, so document them in one place
and update scenario 3's description to mention the sentinel headline
the publisher emits.
Comment on lines +33 to +35
# Hold the run open briefly so subscribers' final poll
# delivers any items still in the log.
await workflow.sleep(timedelta(milliseconds=500))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there more dependable ways of checking if the log is not empty?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, but not always. We are not just checking that the log has been read, we need to know that the application is done reading before we end the workflow. Real applications may want to leave the workflow open for a longer period of time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants